fix: race-safe asyncSem, safe type assertion, atomic shouldNotify#21
fix: race-safe asyncSem, safe type assertion, atomic shouldNotify#21
Conversation
…ouldNotify
- Replace bare asyncSem channel variable with atomic.Pointer[chan struct{}]
to eliminate data race between SetMaxAsyncNotifications and NotifyAsync.
Retain sync.Once guard to prevent repeated channel replacement.
Reduce default concurrency from 1000 to 20.
- Use comma-ok type assertion in GetTraceId() to prevent panic when
tracerID option holds a non-string value.
- Convert customError.shouldNotify from bool to atomic.Bool to fix
data race between concurrent ShouldNotify/Notified calls in
NotifyAsync goroutines.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughSwitches an internal error notify flag to atomic.Bool, makes the notifier's async semaphore an atomic.Pointer to a channel with a single-init default capacity of 20, hardens GetTraceId against non-string values, and adds notifier tests for trace ID and semaphore behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Notifier
participant Sem as "Semaphore (chan struct{})"
Caller->>Notifier: NotifyAsync(ctx, ...)
Notifier->>Notifier: semPtr := asyncSem.Load()
Notifier->>Sem: sem := *semPtr
Notifier->>Sem: try send token (non-blocking)
alt token acquired
Notifier->>Notifier: spawn goroutine to perform notification
Notifier->>Sem: receive token (release) when done
else no token
Notifier-->>Caller: return immediately (dropped)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
notifier/notifier_test.go (2)
42-61: Test is valid for race detection despitesync.Oncesemantics.While 99 of the 100
SetMaxAsyncNotificationscalls are no-ops due tosync.Once, this test remains effective for race detection: the first call performs an atomic store whileNotifyAsyncgoroutines perform atomic loads concurrently. Running with-racewill validate the atomic operations work correctly.Consider adding a brief comment noting that only the first call mutates state, to clarify why the loop is still useful for race verification.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@notifier/notifier_test.go` around lines 42 - 61, Add a short clarifying comment inside TestSetMaxAsyncNotifications_ConcurrentAccess explaining that although SetMaxAsyncNotifications uses sync.Once so 99 of the 100 calls are no-ops, the first call mutates asyncSem and subsequent NotifyAsync calls concurrently perform atomic loads, so the loop is still needed to exercise the concurrent store/load behavior; reference the functions SetMaxAsyncNotifications, NotifyAsync and the asyncSem variable in that comment for clarity.
33-40: Test is a smoke test, not a concurrency verification.This test calls
NotifyAsyncsequentially and only verifies no panic occurs. It doesn't actually validate bounded concurrency (e.g., that at most N goroutines run simultaneously). Consider renaming toTestNotifyAsync_NoPanicor adding actual concurrency assertions if bounded behavior verification is desired.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@notifier/notifier_test.go` around lines 33 - 40, The test TestNotifyAsync_BoundedConcurrency is only a sequential smoke test calling NotifyAsync and doesn't assert bounded concurrency; either rename it to TestNotifyAsync_NoPanic to reflect its purpose, or change it to actually verify concurrency by invoking NotifyAsync concurrently (e.g., spawn many goroutines that call NotifyAsync) and track the active concurrent workers with a shared counter protected by a mutex or an instrumentation hook into the notifier (or by injecting a test semaphore channel) to assert the active count never exceeds the expected limit; update references to the test name or add the concurrency assertions around NotifyAsync accordingly.notifier/notifier.go (1)
59-71: Consider clarifying the single-call constraint.The
sync.Oncewrapper ensures race safety but also means only the first call toSetMaxAsyncNotificationsafter package init takes effect. Ifinit()has already set the default (20), then the first user call to this function will successfully change the limit ton, but any subsequent calls are silently ignored.This could surprise callers who attempt runtime reconfiguration. The docstring documents this, but consider whether a runtime-adjustable limit is desirable or if blocking subsequent calls with an explicit log/warning would improve debuggability.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@notifier/notifier.go` around lines 59 - 71, SetMaxAsyncNotifications currently uses asyncSemOnce.Do and silently ignores any calls after the first (even if init already set the default), which can surprise callers; update SetMaxAsyncNotifications to detect when asyncSem is already initialized (read asyncSem via asyncSem.Load()) and, if a subsequent call is made with a different n, emit a clear warning/debug log mentioning SetMaxAsyncNotifications and the current vs requested value (using your package logger or the standard log), while preserving the existing safety of asyncSemOnce and not changing initialization semantics; reference asyncSemOnce, asyncSem, and SetMaxAsyncNotifications so reviewers can find and add the check-and-log before or around the existing asyncSemOnce.Do call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@notifier/notifier_test.go`:
- Around line 42-61: Add a short clarifying comment inside
TestSetMaxAsyncNotifications_ConcurrentAccess explaining that although
SetMaxAsyncNotifications uses sync.Once so 99 of the 100 calls are no-ops, the
first call mutates asyncSem and subsequent NotifyAsync calls concurrently
perform atomic loads, so the loop is still needed to exercise the concurrent
store/load behavior; reference the functions SetMaxAsyncNotifications,
NotifyAsync and the asyncSem variable in that comment for clarity.
- Around line 33-40: The test TestNotifyAsync_BoundedConcurrency is only a
sequential smoke test calling NotifyAsync and doesn't assert bounded
concurrency; either rename it to TestNotifyAsync_NoPanic to reflect its purpose,
or change it to actually verify concurrency by invoking NotifyAsync concurrently
(e.g., spawn many goroutines that call NotifyAsync) and track the active
concurrent workers with a shared counter protected by a mutex or an
instrumentation hook into the notifier (or by injecting a test semaphore
channel) to assert the active count never exceeds the expected limit; update
references to the test name or add the concurrency assertions around NotifyAsync
accordingly.
In `@notifier/notifier.go`:
- Around line 59-71: SetMaxAsyncNotifications currently uses asyncSemOnce.Do and
silently ignores any calls after the first (even if init already set the
default), which can surprise callers; update SetMaxAsyncNotifications to detect
when asyncSem is already initialized (read asyncSem via asyncSem.Load()) and, if
a subsequent call is made with a different n, emit a clear warning/debug log
mentioning SetMaxAsyncNotifications and the current vs requested value (using
your package logger or the standard log), while preserving the existing safety
of asyncSemOnce and not changing initialization semantics; reference
asyncSemOnce, asyncSem, and SetMaxAsyncNotifications so reviewers can find and
add the check-and-log before or around the existing asyncSemOnce.Do call.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 03ef1c7a-7770-4f14-bbc8-2237e7938cd5
📒 Files selected for processing (3)
errors.gonotifier/notifier.gonotifier/notifier_test.go
There was a problem hiding this comment.
Pull request overview
This PR addresses several concurrency-safety issues and a panic-risk in trace ID retrieval, primarily in the notifier and error-wrapping flows.
Changes:
- Make
NotifyAsyncconcurrency limiting race-safe by storing the semaphore channel in anatomic.Pointer, and lower the default async notification concurrency limit to 20. - Prevent
GetTraceId()from panicking by switching to a comma-ok type assertion when reading thetracerIdoption. - Fix a data race on
customError.shouldNotifyby converting it frombooltoatomic.Bool, and add regression tests.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| notifier/notifier.go | Introduces atomic storage for the async semaphore channel, updates default concurrency, and hardens GetTraceId type handling. |
| notifier/notifier_test.go | Adds regression tests for trace ID type assertion and concurrent access to async notification configuration. |
| errors.go | Replaces customError.shouldNotify with atomic.Bool and updates construction/accessors to be race-safe. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Update SetMaxAsyncNotifications doc to reflect race-safe behavior - Rewrite TestNotifyAsync_BoundedConcurrency to actually assert drops when semaphore is full - Remove outdated comment about pre-existing shouldNotify race
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Now the semaphore is full. Additional calls should be dropped. | ||
| var dropped atomic.Int32 | ||
| originalDebug := NotifyAsync(errors.New("should-drop")) | ||
| // NotifyAsync returns the error regardless of drop/send, so we can't | ||
| // check the return value. Instead, verify the semaphore is still full | ||
| // by checking we can't send another token. | ||
| select { | ||
| case ch <- struct{}{}: | ||
| // We could send — means the slot was free, which means the previous | ||
| // call was dropped (it didn't acquire). That's the expected path. | ||
| <-ch // put it back | ||
| dropped.Add(1) | ||
| default: | ||
| // Slot is full — the previous NotifyAsync got in, which shouldn't | ||
| // happen since we already filled it. This is also fine if timing | ||
| // allowed the blocker to finish. | ||
| } |
There was a problem hiding this comment.
TestNotifyAsync_BoundedConcurrency is currently non-asserting: the select treats both the "could send" and "default" branches as acceptable, so the test will pass regardless of whether NotifyAsync actually drops when the semaphore is full. Please make the test deterministic by forcing the semaphore to be full (e.g., pre-fill the channel) and asserting the drop path (e.g., channel remains full / no token acquired).
There was a problem hiding this comment.
Fixed in 31fc081 — rewrote the test to pre-fill the semaphore channel directly (no timing). Asserts len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken.
| // Fill the single slot with a blocking goroutine. | ||
| block := make(chan struct{}) | ||
| blockErr := errors.New("blocker") | ||
| NotifyAsync(blockErr) // takes the one slot | ||
| // Give the goroutine a moment to acquire the semaphore token. | ||
| time.Sleep(10 * time.Millisecond) | ||
|
|
There was a problem hiding this comment.
The "blocking goroutine" setup doesn’t actually block anything: block is never observed by the async notification goroutine, so the semaphore token is likely released immediately. Relying on time.Sleep here makes the test flaky; prefer a synchronization mechanism that guarantees the token is held (or just pre-fill the semaphore channel to simulate a full pool).
There was a problem hiding this comment.
Fixed in 31fc081 — removed the blocking goroutine approach entirely. Now pre-fills the channel directly, making the test deterministic with no time.Sleep.
| // Drain any tokens left by test goroutines. | ||
| select { | ||
| case <-ch: | ||
| default: | ||
| } |
There was a problem hiding this comment.
The cleanup drains a token from ch without ensuring all NotifyAsync goroutines have finished. If a goroutine is still running, draining its token can cause its deferred <-s release to block forever. Consider avoiding the drain entirely, or synchronizing on goroutine completion before manipulating the semaphore channel.
| // Drain any tokens left by test goroutines. | |
| select { | |
| case <-ch: | |
| default: | |
| } |
There was a problem hiding this comment.
Fixed in 31fc081 — cleanup now drains with a for-len loop, and since we pre-fill rather than spawning goroutines, there's no risk of blocking a deferred release.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func TestNotifyAsync_BoundedConcurrency(t *testing.T) { | ||
| // Set a tiny semaphore so we can observe drops. | ||
| ch := make(chan struct{}, 1) | ||
| asyncSem.Store(&ch) | ||
| t.Cleanup(func() { | ||
| // Drain any tokens left by test goroutines. | ||
| select { | ||
| case <-ch: | ||
| default: | ||
| } | ||
| // Restore default. | ||
| def := make(chan struct{}, 20) | ||
| asyncSem.Store(&def) | ||
| }) | ||
|
|
||
| // Fill the single slot with a blocking goroutine. | ||
| block := make(chan struct{}) | ||
| blockErr := errors.New("blocker") | ||
| NotifyAsync(blockErr) // takes the one slot | ||
| // Give the goroutine a moment to acquire the semaphore token. | ||
| time.Sleep(10 * time.Millisecond) | ||
|
|
||
| // Now the semaphore is full. Additional calls should be dropped. | ||
| var dropped atomic.Int32 | ||
| originalDebug := NotifyAsync(errors.New("should-drop")) | ||
| // NotifyAsync returns the error regardless of drop/send, so we can't | ||
| // check the return value. Instead, verify the semaphore is still full | ||
| // by checking we can't send another token. | ||
| select { | ||
| case ch <- struct{}{}: | ||
| // We could send — means the slot was free, which means the previous | ||
| // call was dropped (it didn't acquire). That's the expected path. | ||
| <-ch // put it back | ||
| dropped.Add(1) | ||
| default: | ||
| // Slot is full — the previous NotifyAsync got in, which shouldn't | ||
| // happen since we already filled it. This is also fine if timing | ||
| // allowed the blocker to finish. | ||
| } | ||
| _ = originalDebug | ||
|
|
||
| // Unblock the first goroutine so it releases the token. | ||
| close(block) | ||
| // Wait a bit for cleanup. |
There was a problem hiding this comment.
TestNotifyAsync_BoundedConcurrency doesn’t actually ensure the semaphore slot is held: block is never used by the notify path, and the test accepts both outcomes in the select without asserting that any calls were dropped. As written, it can pass even if NotifyAsync ignores the semaphore. Consider using a custom error implementing errors.NotifyExt where Notified(true) blocks to hold the token, then assert subsequent NotifyAsync calls hit the drop path (and wait for goroutines to exit to avoid leaks).
| func TestNotifyAsync_BoundedConcurrency(t *testing.T) { | |
| // Set a tiny semaphore so we can observe drops. | |
| ch := make(chan struct{}, 1) | |
| asyncSem.Store(&ch) | |
| t.Cleanup(func() { | |
| // Drain any tokens left by test goroutines. | |
| select { | |
| case <-ch: | |
| default: | |
| } | |
| // Restore default. | |
| def := make(chan struct{}, 20) | |
| asyncSem.Store(&def) | |
| }) | |
| // Fill the single slot with a blocking goroutine. | |
| block := make(chan struct{}) | |
| blockErr := errors.New("blocker") | |
| NotifyAsync(blockErr) // takes the one slot | |
| // Give the goroutine a moment to acquire the semaphore token. | |
| time.Sleep(10 * time.Millisecond) | |
| // Now the semaphore is full. Additional calls should be dropped. | |
| var dropped atomic.Int32 | |
| originalDebug := NotifyAsync(errors.New("should-drop")) | |
| // NotifyAsync returns the error regardless of drop/send, so we can't | |
| // check the return value. Instead, verify the semaphore is still full | |
| // by checking we can't send another token. | |
| select { | |
| case ch <- struct{}{}: | |
| // We could send — means the slot was free, which means the previous | |
| // call was dropped (it didn't acquire). That's the expected path. | |
| <-ch // put it back | |
| dropped.Add(1) | |
| default: | |
| // Slot is full — the previous NotifyAsync got in, which shouldn't | |
| // happen since we already filled it. This is also fine if timing | |
| // allowed the blocker to finish. | |
| } | |
| _ = originalDebug | |
| // Unblock the first goroutine so it releases the token. | |
| close(block) | |
| // Wait a bit for cleanup. | |
| // testNotifyErr implements errors.NotifyExt and allows the test to | |
| // distinguish between accepted and dropped async notifications. | |
| type testNotifyErr struct { | |
| msg string | |
| block <-chan struct{} | |
| dropped *atomic.Int32 | |
| } | |
| func (e *testNotifyErr) Error() string { | |
| return e.msg | |
| } | |
| // Notified is called by NotifyAsync. When async is true, the notification | |
| // was accepted and we block to hold the semaphore token. When async is | |
| // false, the notification was dropped and we record that fact. | |
| func (e *testNotifyErr) Notified(async bool) { | |
| if async { | |
| // Hold the token until the test closes e.block. | |
| <-e.block | |
| return | |
| } | |
| if e.dropped != nil { | |
| e.dropped.Add(1) | |
| } | |
| } | |
| func TestNotifyAsync_BoundedConcurrency(t *testing.T) { | |
| // Set a tiny semaphore so we can observe drops. | |
| ch := make(chan struct{}, 1) | |
| asyncSem.Store(&ch) | |
| t.Cleanup(func() { | |
| // Drain any tokens left by test goroutines. | |
| for { | |
| select { | |
| case <-ch: | |
| default: | |
| goto drained | |
| } | |
| } | |
| drained: | |
| // Restore default. | |
| def := make(chan struct{}, 20) | |
| asyncSem.Store(&def) | |
| }) | |
| // Fill the single slot with a blocking notification. | |
| block := make(chan struct{}) | |
| var dropped atomic.Int32 | |
| holder := &testNotifyErr{ | |
| msg: "holder", | |
| block: block, | |
| dropped: &dropped, | |
| } | |
| NotifyAsync(holder) // should take the one slot and block in Notified(true) | |
| // Give the goroutine a moment to acquire the semaphore token. | |
| time.Sleep(10 * time.Millisecond) | |
| // Now the semaphore is full. Additional calls should be dropped and | |
| // reported via Notified(false). | |
| for i := 0; i < 5; i++ { | |
| NotifyAsync(&testNotifyErr{ | |
| msg: "should-drop", | |
| block: block, | |
| dropped: &dropped, | |
| }) | |
| } | |
| // Allow callbacks to run. | |
| time.Sleep(50 * time.Millisecond) | |
| if got := dropped.Load(); got == 0 { | |
| t.Fatalf("expected some async notifications to be dropped while semaphore slot held, got %d drops", got) | |
| } | |
| // Unblock the goroutines so they can release the token and exit. | |
| close(block) |
There was a problem hiding this comment.
Fixed in 31fc081 — test now pre-fills the 1-slot semaphore and asserts len(ch) == cap(ch) after NotifyAsync.
Pre-fill the semaphore channel directly instead of relying on timing. Assert that len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken. No time.Sleep, no unused channels, no flaky cleanup.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Use ^$$ so Make passes literal ^$ to the shell, ensuring go test -run matches no test names (benchmarks only).
Summary
asyncSemchannel variable withatomic.Pointer[chan struct{}]to eliminate data race betweenSetMaxAsyncNotificationsandNotifyAsyncgoroutines. Retainsync.Onceguard to prevent repeated channel replacement. Reduce default concurrency from 1000 to 20.GetTraceId()to prevent panic whentracerIDoption holds a non-string value.customError.shouldNotifyfrombooltoatomic.Boolto fix data race between concurrentShouldNotify/Notifiedcalls inNotifyAsyncgoroutines.Test plan
go test -race ./...passes (errors + notifier packages)TestGetTraceId_NonStringValue,TestGetTraceId_StringValue,TestNotifyAsync_BoundedConcurrency,TestSetMaxAsyncNotifications_ConcurrentAccessSummary by CodeRabbit
Bug Fixes
Tests
Chores